/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

/**
 * Extends the AbstractRequestInterceptor and provides an implementation for
 * federation of YARN RM and scaling an application across multiple YARN
 * sub-clusters. All the federation specific implementation is encapsulated in
 * this class. This is always the last interceptor in the chain.
 */
public class FederationInterceptor extends AbstractRequestInterceptor {
  private static final Logger LOG =
      LoggerFactory.getLogger(FederationInterceptor.class);

  public static final String NMSS_CLASS_PREFIX = "FederationInterceptor/";

  public static final String NMSS_REG_REQUEST_KEY =
      NMSS_CLASS_PREFIX + "registerRequest";
  public static final String NMSS_REG_RESPONSE_KEY =
      NMSS_CLASS_PREFIX + "registerResponse";

  /**
   * When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn
   * Registry. Otherwise, if NM recovery is enabled, the UAM token are stored in
   * local NMSS instead under this directory name.
   */
  public static final String NMSS_SECONDARY_SC_PREFIX =
      NMSS_CLASS_PREFIX + "secondarySC/";
  public static final String STRING_TO_BYTE_FORMAT = "UTF-8";

  private static final RecordFactory RECORD_FACTORY =
      RecordFactoryProvider.getRecordFactory(null);

  /**
   * From AM's perspective, FederationInterceptor behaves exactly the same as
   * YarnRM (ApplicationMasterService). This is to remember the last heart beat
   * response, used to handle duplicate heart beat and responseId from AM.
   */
  private AllocateResponse lastAllocateResponse;
  private final Object lastAllocateResponseLock = new Object();

  private ApplicationAttemptId attemptId;

  /**
   * The home sub-cluster is the sub-cluster where the AM container is running
   * in.
   */
  private AMRMClientRelayer homeRMRelayer;
  private SubClusterId homeSubClusterId;
  private AMHeartbeatRequestHandler homeHeartbeatHandler;

  /**
   * UAM pool for secondary sub-clusters (ones other than home sub-cluster),
   * using subClusterId as uamId. One UAM is created per sub-cluster RM except
   * the home RM.
   *
   * Creation and register of UAM in secondary sub-clusters happen on-demand,
   * when AMRMProxy policy routes resource request to these sub-clusters for the
   * first time. AM heart beats to them are also handled asynchronously for
   * performance reasons.
   */
  private final UnmanagedAMPoolManager uamPool;

  /**
   * The rmProxy relayers for secondary sub-clusters that keep track of all
   * pending requests.
   */
  private final Map<String, AMRMClientRelayer> secondaryRelayers;

  /**
   * Stores the AllocateResponses that are received asynchronously from all the
   * sub-cluster resource managers, including home RM, but not merged and
   * returned back to AM yet.
   */
  private final Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;

  /**
   * Remembers the last allocate response from all known sub-clusters. This is
   * used together with sub-cluster timeout to assemble entries about
   * cluster-wide info (e.g. AvailableResource, NumClusterNodes) in the allocate
   * response back to AM.
   */
  private final Map<SubClusterId, AllocateResponse> lastSCResponse;

  /**
   * The async UAM registration result that is not consumed yet.
   */
  private final Map<SubClusterId, RegisterApplicationMasterResponse> uamRegistrations;

  // For unit test synchronization
  private final Map<SubClusterId, Future<?>> uamRegisterFutures;

  /** Thread pool used for asynchronous operations. */
  private ExecutorService threadpool;

  /**
   * A flag for work preserving NM restart. If we just recovered, we need to
   * generate an {@link ApplicationMasterNotRegisteredException} exception back
   * to AM (similar to what RM will do after its restart/fail-over) in its next
   * allocate to trigger AM re-register (which we will shield from RM and just
   * return our saved register response) and a full pending requests re-send, so
   * that all the {@link AMRMClientRelayer} will be re-populated with all
   * pending requests.
   *
   */
  private volatile boolean justRecovered;

  /** if true, allocate will be no-op, skipping actual processing. */
  private volatile boolean finishAMCalled;

  /**
   * Used to keep track of the container Id and the sub cluster RM that created
   * the container, so that we know which sub-cluster to forward later requests
   * about existing containers to.
   */
  private final Map<ContainerId, SubClusterId> containerIdToSubClusterIdMap;

  /**
   * The original registration request that was sent by the AM. This instance is
   * reused to register/re-register with all the sub-cluster RMs.
   */
  private RegisterApplicationMasterRequest amRegistrationRequest;

  /**
   * The original registration response returned to AM. This instance is reused
   * for duplicate register request from AM, triggered by timeout between AM and
   * AMRMProxy.
   */
  private RegisterApplicationMasterResponse amRegistrationResponse;

  private FederationStateStoreFacade federationFacade;

  private SubClusterResolver subClusterResolver;

  /**
   * Records the last time a successful heartbeat response received from a known
   * sub-cluster. lastHeartbeatTimeStamp.keySet() should be in sync with
   * uamPool.getAllUAMIds().
   */
  private Map<SubClusterId, Long> lastSCResponseTime;
  private long subClusterTimeOut;

  private long lastAMHeartbeatTime;

  /** The policy used to split requests among sub-clusters. */
  private FederationAMRMProxyPolicy policyInterpreter;

  private FederationRegistryClient registryClient;

  // the maximum wait time for the first async heart beat response
  private long heartbeatMaxWaitTimeMs;

  private int registerUamRetryNum;

  private long registerUamRetryInterval;

  private boolean waitUamRegisterDone;

  private final MonotonicClock clock = new MonotonicClock();

  /*
   * For UAM, keepContainersAcrossApplicationAttempts is always true.
   * When re-register to RM, RM will clear node set and regenerate NMToken for transferred
   * container. But If keepContainersAcrossApplicationAttempts of AM is false, AM may not
   * called getNMTokensFromPreviousAttempts, so the NMToken which is pass from
   * RegisterApplicationMasterResponse will be missing. Here we cache these NMToken,
   * then pass to AM in allocate stage.
   * */
  private Set<NMToken> nmTokenMapFromRegisterSecondaryCluster;

  /**
   * Creates an instance of the FederationInterceptor class.
   */
  public FederationInterceptor() {
    this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>();
    this.asyncResponseSink = new ConcurrentHashMap<>();
    this.lastSCResponse = new ConcurrentHashMap<>();
    this.uamRegistrations = new ConcurrentHashMap<>();
    this.uamRegisterFutures = new ConcurrentHashMap<>();
    this.threadpool = Executors.newCachedThreadPool();
    this.uamPool = createUnmanagedAMPoolManager(this.threadpool);
    this.secondaryRelayers = new ConcurrentHashMap<>();
    this.amRegistrationRequest = null;
    this.amRegistrationResponse = null;
    this.justRecovered = false;
    this.finishAMCalled = false;
    this.lastSCResponseTime = new ConcurrentHashMap<>();
    this.lastAMHeartbeatTime = this.clock.getTime();
    this.nmTokenMapFromRegisterSecondaryCluster = new ConcurrentHashSet<>();
  }

  /**
   * Initializes the instance using specified context.
   */
  @Override
  public void init(AMRMProxyApplicationContext appContext) {
    super.init(appContext);
    LOG.info("Initializing Federation Interceptor");

    // Update the conf if available
    Configuration conf = appContext.getConf();
    if (conf == null) {
      conf = getConf();
    } else {
      setConf(conf);
    }

    // The proxy ugi used to talk to home RM as well as Yarn Registry, loaded
    // with the up-to-date AMRMToken issued by home RM.
    UserGroupInformation appOwner;
    try {
      appOwner = UserGroupInformation.createProxyUser(appContext.getUser(),
          UserGroupInformation.getCurrentUser());
    } catch (Exception ex) {
      throw new YarnRuntimeException(ex);
    }

    if (appContext.getRegistryClient() != null) {
      this.registryClient = new FederationRegistryClient(conf,
          appContext.getRegistryClient(), appOwner);
      // Add all app tokens for Yarn Registry access
      if (appContext.getCredentials() != null) {
        appOwner.addCredentials(appContext.getCredentials());
      }
    }

    this.attemptId = appContext.getApplicationAttemptId();
    ApplicationId appId = this.attemptId.getApplicationId();
    this.homeSubClusterId =
        SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
    this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
        ApplicationMasterProtocol.class, appOwner), appId,
        this.homeSubClusterId.toString(), conf);

    this.homeHeartbeatHandler =
        createHomeHeartbeatHandler(conf, appId, this.homeRMRelayer);
    this.homeHeartbeatHandler.setUGI(appOwner);
    this.homeHeartbeatHandler.setDaemon(true);
    this.homeHeartbeatHandler.start();

    // set lastResponseId to -1 before application master registers
    this.lastAllocateResponse =
        RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
    this.lastAllocateResponse
        .setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID);

    this.federationFacade = FederationStateStoreFacade.getInstance(conf);
    this.subClusterResolver = this.federationFacade.getSubClusterResolver();

    // AMRMProxyPolicy will be initialized in registerApplicationMaster
    this.policyInterpreter = null;

    this.uamPool.init(conf);
    this.uamPool.start();

    this.heartbeatMaxWaitTimeMs =
        conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS,
            YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS);

    this.subClusterTimeOut =
        conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
            YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
    if (this.subClusterTimeOut <= 0) {
      LOG.info(
          "{} configured to be {}, should be positive. Using default of {}.",
          YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
          this.subClusterTimeOut,
          YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
      this.subClusterTimeOut =
          YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
    }

    this.registerUamRetryNum = conf.getInt(
        YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT,
        YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT);
    if (this.registerUamRetryNum <= 0) {
      LOG.info("{} configured to be {}, should be positive. Using default of {}.",
          YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT,
          this.subClusterTimeOut,
          YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT);
      this.registerUamRetryNum =
          YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT;
    }

    this.registerUamRetryInterval = conf.getTimeDuration(
        YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL,
        YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL,
        TimeUnit.MILLISECONDS);

    this.waitUamRegisterDone = conf.getBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE,
        YarnConfiguration.DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE);
  }

  @Override
  public void recover(Map<String, byte[]> recoveredDataMap) {
    super.recover(recoveredDataMap);
    LOG.info("Recovering data for FederationInterceptor for {}.", this.attemptId);
    this.justRecovered = true;

    if (recoveredDataMap == null || recoveredDataMap.isEmpty()) {
      LOG.warn("recoveredDataMap isNull Or isEmpty, FederationInterceptor can't recover.");
      return;
    }

    if (!recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
      return;
    }

    try {

      if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
        byte[] appMasterRequestBytes = recoveredDataMap.get(NMSS_REG_REQUEST_KEY);
        RegisterApplicationMasterRequestProto pb =
            RegisterApplicationMasterRequestProto.parseFrom(appMasterRequestBytes);
        this.amRegistrationRequest = new RegisterApplicationMasterRequestPBImpl(pb);
        LOG.info("amRegistrationRequest recovered for {}.", this.attemptId);
        // Give the register request to homeRMRelayer for future re-registration
        this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
      }

      if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) {
        byte[] appMasterResponseBytes = recoveredDataMap.get(NMSS_REG_RESPONSE_KEY);
        RegisterApplicationMasterResponseProto pb =
            RegisterApplicationMasterResponseProto.parseFrom(appMasterResponseBytes);
        this.amRegistrationResponse = new RegisterApplicationMasterResponsePBImpl(pb);
        LOG.info("amRegistrationResponse recovered for {}.", this.attemptId);
      }

      // Recover UAM amrmTokens from registry or NMSS
      Map<String, Token<AMRMTokenIdentifier>> uamMap =
          recoverSubClusterAMRMTokenIdentifierMap(recoveredDataMap);

      // Re-attach the UAMs
      int containers = 0;
      AMRMProxyApplicationContext applicationContext = getApplicationContext();
      ApplicationId applicationId = this.attemptId.getApplicationId();
      String queue = this.amRegistrationResponse.getQueue();
      String homeSCId = this.homeSubClusterId.getId();
      String user = applicationContext.getUser();

      for (Map.Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) {
        String keyScId = entry.getKey();
        Token<AMRMTokenIdentifier> tokens = entry.getValue();
        SubClusterId subClusterId = SubClusterId.newInstance(keyScId);

        // Create a config loaded with federation on and subClusterId
        // for each UAM
        YarnConfiguration config = new YarnConfiguration(getConf());
        FederationProxyProviderUtil.updateConfForFederation(config, keyScId);

        try {
          ApplicationSubmissionContext originalSubmissionContext =
              federationFacade.getApplicationSubmissionContext(applicationId);

          // ReAttachUAM
          this.uamPool.reAttachUAM(keyScId, config, applicationId, queue, user, homeSCId,
              tokens, keyScId, originalSubmissionContext);

          // GetAMRMClientRelayer
          this.secondaryRelayers.put(keyScId, this.uamPool.getAMRMClientRelayer(keyScId));

          // RegisterApplicationMaster
          RegisterApplicationMasterResponse response =
              this.uamPool.registerApplicationMaster(keyScId, this.amRegistrationRequest);
          nmTokenMapFromRegisterSecondaryCluster.addAll(response.getNMTokensFromPreviousAttempts());

          // Set sub-cluster to be timed out initially
          lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut);

          // Running containers from secondary RMs
          List<Container> previousAttempts = response.getContainersFromPreviousAttempts();
          for (Container container : previousAttempts) {
            ContainerId containerId = container.getId();
            containerIdToSubClusterIdMap.put(containerId, subClusterId);
            containers++;
            LOG.info("From subCluster {} running container {}", subClusterId, containerId);
          }

          LOG.info("Recovered {} running containers from UAM in {}.",
              previousAttempts.size(), subClusterId);

        } catch (Exception e) {
          LOG.error("Error reattaching UAM to {} for {}.", subClusterId, this.attemptId, e);
          // During recovery, we need to clean up the data of the bad SubCluster.
          // This ensures that when the bad SubCluster is recovered,
          // new Containers can still be allocated and new UAMs can be registered.
          this.uamPool.unAttachUAM(keyScId);
          this.secondaryRelayers.remove(keyScId);
          this.lastSCResponseTime.remove(subClusterId);
          List<ContainerId> containerIds =
              containerIdToSubClusterIdMap.entrySet().stream()
              .filter(item-> item.getValue().equals(subClusterId))
              .map(Entry::getKey)
              .collect(Collectors.toList());
          for (ContainerId containerId : containerIds) {
            containerIdToSubClusterIdMap.remove(containerId);
          }
        }
      }

      // Get the running containers from home RM, note that we will also get the
      // AM container itself from here. We don't need it, but no harm to put the
      // map as well.
      UserGroupInformation appSubmitter;
      if (UserGroupInformation.isSecurityEnabled()) {
        appSubmitter = UserGroupInformation.createProxyUser(user,
            UserGroupInformation.getLoginUser());
      } else {
        appSubmitter = UserGroupInformation.createRemoteUser(user);
      }

      ApplicationClientProtocol rmClient = createHomeRMProxy(applicationContext,
          ApplicationClientProtocol.class, appSubmitter);

      GetContainersRequest request = GetContainersRequest.newInstance(this.attemptId);
      GetContainersResponse response = rmClient.getContainers(request);

      for (ContainerReport container : response.getContainerList()) {
        ContainerId containerId = container.getContainerId();
        containerIdToSubClusterIdMap.put(containerId, this.homeSubClusterId);
        containers++;
        LOG.debug("From home RM {} running container {}.", this.homeSubClusterId, containerId);
      }
      LOG.info("{} running containers including AM recovered from home RM {}.",
          response.getContainerList().size(), this.homeSubClusterId);

      LOG.info("In all {} UAMs {} running containers including AM recovered for {}.",
          uamMap.size(), containers, this.attemptId);

      if (queue != null) {
        // Initialize the AMRMProxyPolicy
        queue = this.amRegistrationResponse.getQueue();
        this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
            getConf(), this.federationFacade, this.homeSubClusterId);
      }
    } catch (IOException | YarnException e) {
      throw new YarnRuntimeException(e);
    }
  }

  /**
   * recover SubClusterAMRMTokenIdentifierMap.
   *
   * If registryClient is not empty, restore directly from registryClient,
   * otherwise restore from NMSS.
   *
   * @param recoveredDataMap recoveredDataMap.
   * @return subClusterAMRMTokenIdentifierMap.
   * @throws IOException IO Exception occurs.
   */
  private Map<String, Token<AMRMTokenIdentifier>> recoverSubClusterAMRMTokenIdentifierMap(
      Map<String, byte[]> recoveredDataMap) throws IOException {
    Map<String, Token<AMRMTokenIdentifier>> uamMap;
    ApplicationId applicationId = this.attemptId.getApplicationId();
    if (this.registryClient != null) {
      uamMap = this.registryClient.loadStateFromRegistry(applicationId);
      LOG.info("Found {} existing UAMs for application {} in Yarn Registry.",
          uamMap.size(), applicationId);
    } else {
      uamMap = recoverSubClusterAMRMTokenIdentifierMapFromNMSS(recoveredDataMap);
      LOG.info("Found {} existing UAMs for application {} in NMStateStore.",
          uamMap.size(), applicationId);
    }
    return uamMap;
  }

  /**
   * recover SubClusterAMRMTokenIdentifierMap From NMSS.
   *
   * @param recoveredDataMap recoveredDataMap
   * @return subClusterAMRMTokenIdentifierMap.
   * @throws IOException IO Exception occurs.
   */
  private Map<String, Token<AMRMTokenIdentifier>> recoverSubClusterAMRMTokenIdentifierMapFromNMSS(
      Map<String, byte[]> recoveredDataMap) throws IOException {
    Map<String, Token<AMRMTokenIdentifier>> uamMap = new HashMap<>();
    for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
      String key = entry.getKey();
      byte[] value = entry.getValue();

      if (key.startsWith(NMSS_SECONDARY_SC_PREFIX)) {
        // entry for subClusterId -> UAM AMRMTokenIdentifier
        String scId = key.substring(NMSS_SECONDARY_SC_PREFIX.length());
        Token<AMRMTokenIdentifier> aMRMTokenIdentifier = new Token<>();
        aMRMTokenIdentifier.decodeFromUrlString(new String(value, StandardCharsets.UTF_8));
        uamMap.put(scId, aMRMTokenIdentifier);
        LOG.debug("Recovered UAM in {} from NMSS.", scId);
      }
    }
    return uamMap;
  }

  /**
   * Sends the application master's registration request to the home RM.
   *
   * Between AM and AMRMProxy, FederationInterceptor modifies the RM behavior,
   * so that when AM registers more than once, it returns the same register
   * success response instead of throwing
   * {@link InvalidApplicationMasterRequestException}. Furthermore, we present
   * to AM as if we are the RM that never fails over (except when AMRMProxy
   * restarts). When actual RM fails over, we always re-register automatically.
   *
   * We did this because FederationInterceptor can receive concurrent register
   * requests from AM because of timeout between AM and AMRMProxy, which is
   * shorter than the timeout + failOver between FederationInterceptor
   * (AMRMProxy) and RM.
   *
   * For the same reason, this method needs to be synchronized.
   */
  @Override
  public synchronized RegisterApplicationMasterResponse registerApplicationMaster(
      RegisterApplicationMasterRequest request) throws YarnException, IOException {

    if (request == null) {
      throw new YarnException("RegisterApplicationMasterRequest can't be null!");
    }

    // Reset the heartbeat responseId to zero upon register
    synchronized (this.lastAllocateResponseLock) {
      this.lastAllocateResponse.setResponseId(0);
    }
    this.justRecovered = false;

    // If AM is calling with a different request, complain
    if (this.amRegistrationRequest != null) {
      if (!this.amRegistrationRequest.equals(request)) {
        throw new YarnException("AM should not call "
            + "registerApplicationMaster with a different request body");
      }
    } else {
      // Save the registration request. This will be used for registering with
      // secondary sub-clusters using UAMs, as well as re-register later
      this.amRegistrationRequest = request;
      RegisterApplicationMasterRequestPBImpl requestPB = (RegisterApplicationMasterRequestPBImpl)
          this.amRegistrationRequest;
      storeAMRMProxyAppContextEntry(NMSS_REG_REQUEST_KEY, requestPB.getProto().toByteArray());
    }

    /*
     * Present to AM as if we are the RM that never fails over. When actual RM
     * fails over, we always re-register automatically.
     *
     * We did this because it is possible for AM to send duplicate register
     * request because of timeout. When it happens, it is fine to simply return
     * the success message. Out of all outstanding register threads, only the
     * last one will still have an unbroken RPC connection and successfully
     * return the response.
     */
    if (this.amRegistrationResponse != null) {
      return this.amRegistrationResponse;
    }

    /*
     * Send a registration request to the home resource manager. Note that here
     * we don't register with other sub-cluster resource managers because that
     * will prevent us from using new sub-clusters that get added while the AM
     * is running and will breaks the elasticity feature. The registration with
     * the other sub-cluster RM will be done lazily as needed later.
     */
    this.amRegistrationResponse = this.homeRMRelayer.registerApplicationMaster(request);

    if (this.amRegistrationResponse == null) {
      throw new YarnException("RegisterApplicationMasterResponse can't be null!");
    }

    List<Container> containersFromPreviousAttempts =
        this.amRegistrationResponse.getContainersFromPreviousAttempts();
    if (containersFromPreviousAttempts != null) {
      cacheAllocatedContainers(containersFromPreviousAttempts, this.homeSubClusterId);
    }

    ApplicationId appId = this.attemptId.getApplicationId();
    reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);

    RegisterApplicationMasterResponsePBImpl responsePB = (RegisterApplicationMasterResponsePBImpl)
        this.amRegistrationResponse;
    storeAMRMProxyAppContextEntry(NMSS_REG_RESPONSE_KEY, responsePB.getProto().toByteArray());

    // the queue this application belongs will be used for getting
    // AMRMProxy policy from state store.
    String queue = this.amRegistrationResponse.getQueue();
    if (queue == null) {
      LOG.warn("Received null queue for application {} from home subcluster. " +
          " Will use default queue name {} for getting AMRMProxyPolicy.", appId,
          YarnConfiguration.DEFAULT_QUEUE_NAME);
    } else {
      LOG.info("Application {} belongs to queue {}.", appId, queue);
    }

    // Initialize the AMRMProxyPolicy
    try {
      this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
          getConf(), this.federationFacade, this.homeSubClusterId);
    } catch (FederationPolicyInitializationException e) {
      throw new YarnRuntimeException(e);
    }
    return this.amRegistrationResponse;
  }

  /**
   * Add a context entry for an application attempt in AMRMProxyService.
   *
   * @param key key string
   * @param data state data
   */
  private void storeAMRMProxyAppContextEntry(String key, byte[] data) {
    NMStateStoreService nmStateStore = getNMStateStore();
    if (nmStateStore != null) {
      try {
        nmStateStore.storeAMRMProxyAppContextEntry(this.attemptId, key, data);
      } catch (Exception e) {
        LOG.error("Error storing AMRMProxy application context entry[{}] for {}.",
            key, this.attemptId, e);
      }
    }
  }

  /**
   * Sends the heart beats to the home RM and the secondary sub-cluster RMs that
   * are being used by the application.
   */
  @Override
  public AllocateResponse allocate(AllocateRequest request)
      throws YarnException, IOException {
    Preconditions.checkArgument(this.policyInterpreter != null,
        "Allocate should be called after registerApplicationMaster");
    this.lastAMHeartbeatTime = this.clock.getTime();

    if (this.justRecovered) {
      throw new ApplicationMasterNotRegisteredException(
          "AMRMProxy just restarted and recovered for " + this.attemptId
              + ". AM should re-register and full re-send pending requests.");
    }

    if (this.finishAMCalled) {
      LOG.warn("FinishApplicationMaster already called by {}, skip heartbeat "
          + "processing and return dummy response.", this.attemptId);
      return RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
    }

    // Check responseId and handle duplicate heartbeat exactly same as RM
    synchronized (this.lastAllocateResponseLock) {
      LOG.info("Heartbeat from " + this.attemptId + " with responseId "
          + request.getResponseId() + " when we are expecting "
          + this.lastAllocateResponse.getResponseId());
      // Normally request.getResponseId() == lastResponse.getResponseId()
      if (AMRMClientUtils.getNextResponseId(
          request.getResponseId()) == this.lastAllocateResponse
              .getResponseId()) {
        // heartbeat one step old, simply return lastResponse
        return this.lastAllocateResponse;
      } else if (request.getResponseId() != this.lastAllocateResponse
          .getResponseId()) {
        throw new InvalidApplicationMasterRequestException(
            AMRMClientUtils.assembleInvalidResponseIdExceptionMessage(attemptId,
                this.lastAllocateResponse.getResponseId(),
                request.getResponseId()));
      }
    }

    try {
      // Split the heart beat request into multiple requests, one for each
      // sub-cluster RM that is used by this application.
      Map<SubClusterId, AllocateRequest> requests =
          splitAllocateRequest(request);

      /**
       * Send the requests to the all sub-cluster resource managers. All
       * requests are synchronously triggered but sent asynchronously. Later the
       * responses will be collected and merged.
       */
      sendRequestsToResourceManagers(requests);

      // Wait for the first async response to arrive
      long startTime = this.clock.getTime();
      synchronized (this.asyncResponseSink) {
        try {
          this.asyncResponseSink.wait(this.heartbeatMaxWaitTimeMs);
        } catch (InterruptedException e) {
        }
      }
      long firstResponseTime = this.clock.getTime() - startTime;

      // An extra brief wait for other async heart beats, so that most of their
      // responses can make it back to AM in the same heart beat round trip.
      try {
        Thread.sleep(firstResponseTime);
      } catch (InterruptedException e) {
      }

      // Prepare the response to AM
      AllocateResponse response = generateBaseAllocationResponse();

      // Merge all responses from response sink
      mergeAllocateResponses(response);

      // Merge the containers and NMTokens from the new registrations into
      // the response

      if (!isNullOrEmpty(this.uamRegistrations)) {
        Map<SubClusterId, RegisterApplicationMasterResponse> newRegistrations;
        synchronized (this.uamRegistrations) {
          newRegistrations = new HashMap<>(this.uamRegistrations);
          this.uamRegistrations.clear();
        }
        mergeRegistrationResponses(response, newRegistrations);
      }

      // update the responseId and return the final response to AM
      synchronized (this.lastAllocateResponseLock) {
        response.setResponseId(AMRMClientUtils
            .getNextResponseId(this.lastAllocateResponse.getResponseId()));
        this.lastAllocateResponse = response;
      }
      return response;
    } catch (Throwable ex) {
      LOG.error("Exception encountered while processing heart beat for "
          + this.attemptId, ex);
      throw new YarnException(ex);
    }
  }

  /**
   * Sends the finish application master request to all the resource managers
   * used by the application.
   */
  @Override
  public FinishApplicationMasterResponse finishApplicationMaster(
      FinishApplicationMasterRequest request)
      throws YarnException, IOException {

    this.finishAMCalled = true;

    boolean failedToUnRegister = false;

    // Application master is completing operation. Send the finish
    // application master request to all the registered sub-cluster resource
    // managers in parallel, wait for the responses and aggregate the results.
    Map<String, FinishApplicationMasterResponse> responseMap =
        this.uamPool.batchFinishApplicationMaster(request, attemptId.toString());

    for (Map.Entry<String, FinishApplicationMasterResponse> entry : responseMap.entrySet()) {
      String subClusterId = entry.getKey();
      FinishApplicationMasterResponse response = entry.getValue();
      if (response != null && response.getIsUnregistered()) {
        secondaryRelayers.remove(subClusterId);
        if (getNMStateStore() != null) {
          getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
              NMSS_SECONDARY_SC_PREFIX + subClusterId);
        }
      } else {
        // response is null or response.getIsUnregistered() == false
        failedToUnRegister = true;
      }
    }

    // While the finish application request is being processed
    // asynchronously by other sub-cluster resource managers, send the same
    // request to the home resource manager on this thread.
    FinishApplicationMasterResponse homeResponse =
        this.homeRMRelayer.finishApplicationMaster(request);

    // Stop the home heartbeat thread
    this.homeHeartbeatHandler.shutdown();

    if (failedToUnRegister) {
      homeResponse.setIsUnregistered(false);
    } else if (checkRequestFinalApplicationStatusSuccess(request)) {
      // Clean up UAMs only when the app finishes successfully, so that no more
      // attempt will be launched.
      this.uamPool.stop();
      removeAppFromRegistry();
    }
    return homeResponse;
  }

  private boolean checkRequestFinalApplicationStatusSuccess(
      FinishApplicationMasterRequest request) {
    if (request != null && request.getFinalApplicationStatus() != null) {
      return request.getFinalApplicationStatus().equals(FinalApplicationStatus.SUCCEEDED);
    }
    return false;
  }

  @Override
  public void setNextInterceptor(RequestInterceptor next) {
    throw new YarnRuntimeException(
        "setNextInterceptor is being called on FederationInterceptor. "
            + "It should always be used as the last interceptor in the chain");
  }

  /**
   * This is called when the application pipeline is being destroyed. We will
   * release all the resources that we are holding in this call.
   */
  @Override
  public void shutdown() {
    LOG.info("Shutting down FederationInterceptor for {}", this.attemptId);

    // Do not stop uamPool service and kill UAMs here because of possible second
    // app attempt
    try {
      this.uamPool.shutDownConnections();
    } catch (YarnException e) {
      LOG.error("Error shutting down all UAM clients without killing them", e);
    }

    if (this.threadpool != null) {
      try {
        this.threadpool.shutdown();
      } catch (Throwable ex) {
      }
      this.threadpool = null;
    }

    // Stop the home heartbeat thread
    this.homeHeartbeatHandler.shutdown();
    this.homeRMRelayer.shutdown();

    // Shutdown needs to clean up app
    removeAppFromRegistry();

    super.shutdown();
  }

  private void removeAppFromRegistry() {
    if (this.registryClient != null && this.attemptId != null) {
      ApplicationId applicationId = this.attemptId.getApplicationId();
      if (applicationId != null) {
        this.registryClient.removeAppFromRegistry(applicationId);
      }
    }
  }

  /**
   * Only for unit test cleanup.
   */
  @VisibleForTesting
  protected void cleanupRegistry() {
    if (this.registryClient != null) {
      this.registryClient.cleanAllApplications();
    }
  }

  @VisibleForTesting
  protected FederationRegistryClient getRegistryClient() {
    return this.registryClient;
  }

  @VisibleForTesting
  protected ApplicationAttemptId getAttemptId() {
    return this.attemptId;
  }

  @VisibleForTesting
  protected AMHeartbeatRequestHandler getHomeHeartbeatHandler() {
    return this.homeHeartbeatHandler;
  }

  /**
   * Create the UAM pool manager for secondary sub-clusters. For unit test to
   * override.
   *
   * @param threadPool the thread pool to use
   * @return the UAM pool manager instance
   */
  @VisibleForTesting
  protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(
      ExecutorService threadPool) {
    return new UnmanagedAMPoolManager(threadPool);
  }

  @VisibleForTesting
  protected AMHeartbeatRequestHandler createHomeHeartbeatHandler(
      Configuration conf, ApplicationId appId,
      AMRMClientRelayer rmProxyRelayer) {
    return new AMHeartbeatRequestHandler(conf, appId, rmProxyRelayer);
  }

  /**
   * Create a proxy instance that is used to connect to the Home resource
   * manager.
   *
   * @param appContext AMRMProxyApplicationContext
   * @param protocol the protocol class for the proxy
   * @param user the ugi for the proxy
   * @param <T> the type of the proxy
   * @return the proxy created
   */
  protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext,
      Class<T> protocol, UserGroupInformation user) {
    try {
      return FederationProxyProviderUtil.createRMProxy(appContext.getConf(),
          protocol, this.homeSubClusterId, user, appContext.getAMRMToken());
    } catch (Exception ex) {
      throw new YarnRuntimeException(ex);
    }
  }

  private void mergeRegisterResponse(
      RegisterApplicationMasterResponse homeResponse,
      RegisterApplicationMasterResponse otherResponse) {

    if (!isNullOrEmpty(otherResponse.getContainersFromPreviousAttempts())) {
      if (!isNullOrEmpty(homeResponse.getContainersFromPreviousAttempts())) {
        homeResponse.getContainersFromPreviousAttempts()
            .addAll(otherResponse.getContainersFromPreviousAttempts());
      } else {
        homeResponse.setContainersFromPreviousAttempts(
            otherResponse.getContainersFromPreviousAttempts());
      }
    }

    if (!isNullOrEmpty(otherResponse.getNMTokensFromPreviousAttempts())) {
      if (!isNullOrEmpty(homeResponse.getNMTokensFromPreviousAttempts())) {
        homeResponse.getNMTokensFromPreviousAttempts()
            .addAll(otherResponse.getNMTokensFromPreviousAttempts());
      } else {
        homeResponse.setNMTokensFromPreviousAttempts(
            otherResponse.getNMTokensFromPreviousAttempts());
      }
    }
  }

  /**
   * Try re-attach to all existing and running UAMs in secondary sub-clusters
   * launched by previous application attempts if any. All running containers in
   * the UAMs will be combined into the registerResponse. For the first attempt,
   * the registry will be empty for this application and thus no-op here.
   */
  protected void reAttachUAMAndMergeRegisterResponse(
      RegisterApplicationMasterResponse homeResponse,
      final ApplicationId appId) {

    if (this.registryClient == null) {
      // Both AMRMProxy HA and NM work preserving restart is not enabled
      LOG.warn("registryClient is null, skip attaching existing UAM if any");
      return;
    }

    // Load existing running UAMs from the previous attempts from
    // registry, if any
    Map<String, Token<AMRMTokenIdentifier>> uamMap =
        this.registryClient.loadStateFromRegistry(appId);
    if (uamMap.size() == 0) {
      LOG.info("No existing UAM for application {} found in Yarn Registry",
          appId);
      return;
    }
    LOG.info("Found {} existing UAMs for application {} in Yarn Registry. "
        + "Reattaching in parallel", uamMap.size(), appId);

    ExecutorCompletionService<RegisterApplicationMasterResponse>
        completionService = new ExecutorCompletionService<>(this.threadpool);

    for (Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) {
      final SubClusterId subClusterId =
          SubClusterId.newInstance(entry.getKey());
      final Token<AMRMTokenIdentifier> amrmToken = entry.getValue();

      completionService
          .submit(() -> {
            RegisterApplicationMasterResponse response = null;
            try {
              // Create a config loaded with federation on and subclusterId
              // for each UAM
              YarnConfiguration config = new YarnConfiguration(getConf());
              FederationProxyProviderUtil.updateConfForFederation(config,
                  subClusterId.getId());

              ApplicationSubmissionContext originalSubmissionContext =
                  federationFacade.getApplicationSubmissionContext(appId);

              uamPool.reAttachUAM(subClusterId.getId(), config, appId,
                  amRegistrationResponse.getQueue(),
                  getApplicationContext().getUser(), homeSubClusterId.getId(),
                  amrmToken, subClusterId.toString(), originalSubmissionContext);

              secondaryRelayers.put(subClusterId.getId(),
                  uamPool.getAMRMClientRelayer(subClusterId.getId()));

              response = uamPool.registerApplicationMaster(subClusterId.getId(),
                  amRegistrationRequest);

              // Set sub-cluster to be timed out initially
              lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut);

              if (response != null && response.getContainersFromPreviousAttempts() != null) {
                cacheAllocatedContainers(response.getContainersFromPreviousAttempts(),
                    subClusterId);
              }
              LOG.info("UAM {} reattached for {}", subClusterId, appId);
            } catch (Throwable e) {
              LOG.error("Reattaching UAM {} failed for {}.", subClusterId, appId, e);
            }
            return response;
          });
    }

    // Wait for the re-attach responses
    for (int i = 0; i < uamMap.size(); i++) {
      try {
        Future<RegisterApplicationMasterResponse> future =
            completionService.take();
        RegisterApplicationMasterResponse registerResponse = future.get();
        if (registerResponse != null) {
          LOG.info("Merging register response for {}", appId);
          mergeRegisterResponse(homeResponse, registerResponse);
          nmTokenMapFromRegisterSecondaryCluster.addAll(
              registerResponse.getNMTokensFromPreviousAttempts());
        }
      } catch (Exception e) {
        LOG.warn("Reattaching UAM failed for ApplicationId: " + appId, e);
      }
    }
  }

  private SubClusterId getSubClusterForNode(String nodeName) {
    SubClusterId subClusterId;
    try {
      subClusterId = this.subClusterResolver.getSubClusterForNode(nodeName);
    } catch (YarnException e) {
      LOG.error("Failed to resolve sub-cluster for node " + nodeName
          + ", skipping this node", e);
      return null;
    }
    if (subClusterId == null) {
      LOG.error("Failed to resolve sub-cluster for node {}, skipping this node",
          nodeName);
      return null;
    }
    return subClusterId;
  }

  /**
   * In federation, the heart beat request needs to be sent to all the sub
   * clusters from which the AM has requested containers. This method splits the
   * specified AllocateRequest from the AM and creates a new request for each
   * sub-cluster RM.
   */
  private Map<SubClusterId, AllocateRequest> splitAllocateRequest(
      AllocateRequest request) throws YarnException {
    Map<SubClusterId, AllocateRequest> requestMap = new HashMap<>();

    // Create heart beat request for home sub-cluster resource manager
    findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request,
        requestMap);

    // Create heart beat request instances for all other already registered
    // sub-cluster resource managers
    Set<String> subClusterIds = this.uamPool.getAllUAMIds();
    for (String subClusterId : subClusterIds) {
      findOrCreateAllocateRequestForSubCluster(
          SubClusterId.newInstance(subClusterId), request, requestMap);
    }

    if (!isNullOrEmpty(request.getAskList())) {
      // Ask the federation policy interpreter to split the ask list for
      // sending it to all the sub-cluster resource managers.
      Map<SubClusterId, List<ResourceRequest>> asks =
          splitResourceRequests(request.getAskList());

      // Add the askLists to the corresponding sub-cluster requests.
      for (Entry<SubClusterId, List<ResourceRequest>> entry : asks.entrySet()) {
        AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
            entry.getKey(), request, requestMap);
        newRequest.getAskList().addAll(entry.getValue());
      }
    }

    if (request.getResourceBlacklistRequest() != null) {
      if (!isNullOrEmpty(
          request.getResourceBlacklistRequest().getBlacklistAdditions())) {
        for (String resourceName : request.getResourceBlacklistRequest()
            .getBlacklistAdditions()) {
          SubClusterId subClusterId = getSubClusterForNode(resourceName);
          if (subClusterId != null) {
            AllocateRequest newRequest =
                findOrCreateAllocateRequestForSubCluster(subClusterId, request,
                    requestMap);
            newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
                .add(resourceName);
          }
        }
      }
      if (!isNullOrEmpty(
          request.getResourceBlacklistRequest().getBlacklistRemovals())) {
        for (String resourceName : request.getResourceBlacklistRequest()
            .getBlacklistRemovals()) {
          SubClusterId subClusterId = getSubClusterForNode(resourceName);
          if (subClusterId != null) {
            AllocateRequest newRequest =
                findOrCreateAllocateRequestForSubCluster(subClusterId, request,
                    requestMap);
            newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
                .add(resourceName);
          }
        }
      }
    }

    if (!isNullOrEmpty(request.getReleaseList())) {
      for (ContainerId cid : request.getReleaseList()) {
        if (warnIfNotExists(cid, "release")) {
          SubClusterId subClusterId =
              this.containerIdToSubClusterIdMap.get(cid);
          AllocateRequest newRequest = requestMap.get(subClusterId);
          newRequest.getReleaseList().add(cid);
        }
      }
    }

    if (!isNullOrEmpty(request.getUpdateRequests())) {
      for (UpdateContainerRequest ucr : request.getUpdateRequests()) {
        if (warnIfNotExists(ucr.getContainerId(), "update")) {
          SubClusterId subClusterId =
              this.containerIdToSubClusterIdMap.get(ucr.getContainerId());
          AllocateRequest newRequest = requestMap.get(subClusterId);
          newRequest.getUpdateRequests().add(ucr);
        }
      }
    }

    return requestMap;
  }

  /**
   * This methods sends the specified AllocateRequests to the appropriate
   * sub-cluster resource managers asynchronously.
   *
   * @param requests contains the heart beat requests to send to the resource
   *          manager keyed by the sub-cluster id
   * @throws YarnException exceptions from yarn servers.
   * @throws IOException an I/O exception of some sort has occurred.
   */
  private void sendRequestsToResourceManagers(
      Map<SubClusterId, AllocateRequest> requests)
      throws YarnException, IOException {

    // Create new UAM instances for the sub-cluster that we haven't seen before
    List<SubClusterId> newSubClusters =
        registerAndAllocateWithNewSubClusters(requests);

    // Now that all the registrations are done, send the allocation request
    // to the sub-cluster RMs asynchronously and don't wait for the response.
    // The responses will arrive asynchronously and will be added to the
    // response sink, then merged and sent to the application master.
    for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
      SubClusterId subClusterId = entry.getKey();
      if (newSubClusters.contains(subClusterId)) {
        // For new sub-clusters, we have already sent the request right after
        // register in the async thread
        continue;
      }

      if (subClusterId.equals(this.homeSubClusterId)) {
        // Request for the home sub-cluster resource manager
        this.homeHeartbeatHandler.allocateAsync(entry.getValue(),
            new HeartbeatCallBack(this.homeSubClusterId, false));
      } else {
        if (!this.uamPool.hasUAMId(subClusterId.getId())) {
          throw new YarnException("UAM not found for " + this.attemptId
              + " in sub-cluster " + subClusterId);
        }
        this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
            new HeartbeatCallBack(subClusterId, true));
      }
    }
  }

  /**
   * This method ensures that Unmanaged AMs are created for newly specified
   * sub-clusters, registers with the corresponding resource managers and send
   * the first allocate request async.
   */
  private List<SubClusterId> registerAndAllocateWithNewSubClusters(
      final Map<SubClusterId, AllocateRequest> requests) throws IOException {

    // Check to see if there are any new sub-clusters in this request
    // list and create and register Unmanaged AM instance for the new ones
    List<SubClusterId> newSubClusters = new ArrayList<>();

    requests.keySet().forEach(subClusterId -> {
      String id = subClusterId.getId();
      if (!subClusterId.equals(this.homeSubClusterId) && !this.uamPool.hasUAMId(id)) {
        newSubClusters.add(subClusterId);
        // Set sub-cluster to be timed out initially
        lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut);
      }
    });

    this.uamRegisterFutures.clear();

    for (final SubClusterId scId : newSubClusters) {

      Future<?> future = this.threadpool.submit(() -> {

        String subClusterId = scId.getId();

        // Create a config loaded with federation on and subclusterId
        // for each UAM
        YarnConfiguration config = new YarnConfiguration(getConf());
        FederationProxyProviderUtil.updateConfForFederation(config, subClusterId);
        ApplicationId applicationId = attemptId.getApplicationId();

        RegisterApplicationMasterResponse uamResponse;
        Token<AMRMTokenIdentifier> token;

        // LaunchUAM And RegisterApplicationMaster
        try {
          TokenAndRegisterResponse result =
              ((FederationActionRetry<TokenAndRegisterResponse>) (retryCount) ->
              launchUAMAndRegisterApplicationMaster(config, subClusterId, applicationId)).
              runWithRetries(registerUamRetryNum, registerUamRetryInterval);

          token = result.getToken();
          uamResponse = result.getResponse();
        } catch (Throwable e) {
          LOG.error("Failed to register application master: {} Application: {}.",
              subClusterId, attemptId, e);
          return;
        }

        uamRegistrations.put(scId, uamResponse);

        LOG.info("Successfully registered unmanaged application master: {} " +
            "ApplicationId: {}.", subClusterId, attemptId);

        // Allocate Request
        try {
          uamPool.allocateAsync(subClusterId, requests.get(scId),
              new HeartbeatCallBack(scId, true));
        } catch (Throwable e) {
          LOG.error("Failed to allocate async to {} Application: {}.",
              subClusterId, attemptId, e);
        }

        // Save the UAM token in registry or NMSS
        try {
          if (registryClient != null) {
            registryClient.writeAMRMTokenForUAM(applicationId, subClusterId, token);
          } else if (getNMStateStore() != null) {
            getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
                NMSS_SECONDARY_SC_PREFIX + subClusterId,
                token.encodeToUrlString().getBytes(StandardCharsets.UTF_8));
          }
        } catch (Throwable e) {
          LOG.error("Failed to persist UAM token from {} Application {}",
              subClusterId, attemptId, e);
        }
      });

      this.uamRegisterFutures.put(scId, future);
    }

    if (this.waitUamRegisterDone) {
      for (Map.Entry<SubClusterId, Future<?>> entry : this.uamRegisterFutures.entrySet()) {
        SubClusterId subClusterId = entry.getKey();
        Future<?> future = entry.getValue();
        while (!future.isDone()) {
          LOG.info("subClusterId {} Wait Uam Register done.", subClusterId);
        }
      }
    }

    return newSubClusters;
  }

  protected TokenAndRegisterResponse launchUAMAndRegisterApplicationMaster(
      YarnConfiguration config, String subClusterId, ApplicationId applicationId)
      throws IOException, YarnException {

    // Prepare parameter information
    ApplicationSubmissionContext originalSubmissionContext =
        federationFacade.getApplicationSubmissionContext(applicationId);
    String submitter = getApplicationContext().getUser();
    String homeRM = homeSubClusterId.toString();
    String queue = amRegistrationResponse.getQueue();

    // For appNameSuffix, use subClusterId of the home sub-cluster
    Token<AMRMTokenIdentifier> token = uamPool.launchUAM(subClusterId, config, applicationId,
        queue, submitter, homeRM, true, subClusterId, originalSubmissionContext);

    // Set the relationship between SubCluster and AMRMClientRelayer.
    secondaryRelayers.put(subClusterId, uamPool.getAMRMClientRelayer(subClusterId));

    // RegisterApplicationMaster
    RegisterApplicationMasterResponse uamResponse =
        uamPool.registerApplicationMaster(subClusterId, amRegistrationRequest);

    return new TokenAndRegisterResponse(token, uamResponse);
  }

  /**
   * Prepare the base allocation response. Use lastSCResponse and
   * lastHeartbeatTimeStamp to assemble entries about cluster-wide info, e.g.
   * AvailableResource, NumClusterNodes.
   */
  protected AllocateResponse generateBaseAllocationResponse() {
    AllocateResponse baseResponse =
        RECORD_FACTORY.newRecordInstance(AllocateResponse.class);

    baseResponse.setAvailableResources(Resource.newInstance(0, 0));
    baseResponse.setNumClusterNodes(0);

    Set<SubClusterId> expiredSC = getTimedOutSCs(false);
    for (Entry<SubClusterId, AllocateResponse> entry : lastSCResponse
        .entrySet()) {
      if (expiredSC.contains(entry.getKey())) {
        // Skip expired sub-clusters
        continue;
      }
      AllocateResponse response = entry.getValue();

      if (response.getAvailableResources() != null) {
        baseResponse.setAvailableResources(
            Resources.add(baseResponse.getAvailableResources(),
                response.getAvailableResources()));
      }
      baseResponse.setNumClusterNodes(
          baseResponse.getNumClusterNodes() + response.getNumClusterNodes());
    }
    return baseResponse;
  }

  /**
   * Merge the responses from all sub-clusters that we received asynchronously
   * and keeps track of the containers received from each sub-cluster resource
   * managers.
   */
  private void mergeAllocateResponses(AllocateResponse mergedResponse) {
    synchronized (this.asyncResponseSink) {
      for (Entry<SubClusterId, List<AllocateResponse>> entry :
          this.asyncResponseSink.entrySet()) {
        SubClusterId subClusterId = entry.getKey();
        List<AllocateResponse> responses = entry.getValue();
        if (responses.size() > 0) {
          for (AllocateResponse response : responses) {
            removeFinishedContainersFromCache(
                response.getCompletedContainersStatuses());
            cacheAllocatedContainers(response.getAllocatedContainers(),
                subClusterId);
            mergeAllocateResponse(mergedResponse, response, subClusterId);
          }
          responses.clear();
        }
      }
    }
    // When re-register RM, client may not cache the NMToken from register response.
    // Here we pass these NMToken in allocate stage.
    if (nmTokenMapFromRegisterSecondaryCluster.size() > 0) {
      List<NMToken> duplicateNmToken = new ArrayList(nmTokenMapFromRegisterSecondaryCluster);
      nmTokenMapFromRegisterSecondaryCluster.removeAll(duplicateNmToken);
      if (!isNullOrEmpty(mergedResponse.getNMTokens())) {
        mergedResponse.getNMTokens().addAll(duplicateNmToken);
      } else {
        mergedResponse.setNMTokens(duplicateNmToken);
      }
    }
  }

  /**
   * Removes the finished containers from the local cache.
   */
  private void removeFinishedContainersFromCache(
      List<ContainerStatus> finishedContainers) {
    for (ContainerStatus container : finishedContainers) {
      LOG.debug("Completed container {}", container);
      containerIdToSubClusterIdMap.remove(container.getContainerId());
    }
  }

  /**
   * Helper method for merging the registration responses from the secondary sub
   * cluster RMs into the allocate response to return to the AM.
   */
  private void mergeRegistrationResponses(AllocateResponse homeResponse,
      Map<SubClusterId, RegisterApplicationMasterResponse> registrations) {

    for (Entry<SubClusterId, RegisterApplicationMasterResponse> entry :
        registrations.entrySet()) {
      RegisterApplicationMasterResponse registration = entry.getValue();

      if (!isNullOrEmpty(registration.getContainersFromPreviousAttempts())) {
        List<Container> tempContainers = homeResponse.getAllocatedContainers();
        if (!isNullOrEmpty(tempContainers)) {
          tempContainers
              .addAll(registration.getContainersFromPreviousAttempts());
          homeResponse.setAllocatedContainers(tempContainers);
        } else {
          homeResponse.setAllocatedContainers(
              registration.getContainersFromPreviousAttempts());
        }
        cacheAllocatedContainers(
            registration.getContainersFromPreviousAttempts(), entry.getKey());
      }

      if (!isNullOrEmpty(registration.getNMTokensFromPreviousAttempts())) {
        List<NMToken> tempTokens = homeResponse.getNMTokens();
        if (!isNullOrEmpty(tempTokens)) {
          tempTokens.addAll(registration.getNMTokensFromPreviousAttempts());
          homeResponse.setNMTokens(tempTokens);
        } else {
          homeResponse
              .setNMTokens(registration.getNMTokensFromPreviousAttempts());
        }
      }
    }
  }

  @VisibleForTesting
  protected void mergeAllocateResponse(AllocateResponse homeResponse,
      AllocateResponse otherResponse, SubClusterId otherRMAddress) {

    if (otherResponse.getAMRMToken() != null) {
      // Propagate only the new amrmToken from home sub-cluster back to
      // AMRMProxyService
      if (otherRMAddress.equals(this.homeSubClusterId)) {
        homeResponse.setAMRMToken(otherResponse.getAMRMToken());
      } else {
        LOG.warn("amrmToken from UAM {} not null, it should be null here",
            otherRMAddress);
      }
    }

    if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) {
      if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) {
        homeResponse.getAllocatedContainers()
            .addAll(otherResponse.getAllocatedContainers());
      } else {
        homeResponse
            .setAllocatedContainers(otherResponse.getAllocatedContainers());
      }
    }

    if (!isNullOrEmpty(otherResponse.getCompletedContainersStatuses())) {
      if (!isNullOrEmpty(homeResponse.getCompletedContainersStatuses())) {
        homeResponse.getCompletedContainersStatuses()
            .addAll(otherResponse.getCompletedContainersStatuses());
      } else {
        homeResponse.setCompletedContainersStatuses(
            otherResponse.getCompletedContainersStatuses());
      }
    }

    if (!isNullOrEmpty(otherResponse.getUpdatedNodes())) {
      if (!isNullOrEmpty(homeResponse.getUpdatedNodes())) {
        homeResponse.getUpdatedNodes().addAll(otherResponse.getUpdatedNodes());
      } else {
        homeResponse.setUpdatedNodes(otherResponse.getUpdatedNodes());
      }
    }

    if (otherResponse.getApplicationPriority() != null) {
      homeResponse.setApplicationPriority(
          otherResponse.getApplicationPriority());
    }

    homeResponse.setNumClusterNodes(
        homeResponse.getNumClusterNodes() + otherResponse.getNumClusterNodes());

    PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
    PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage();

    if (homePreempMessage == null && otherPreempMessage != null) {
      homeResponse.setPreemptionMessage(otherPreempMessage);
    }

    if (homePreempMessage != null && otherPreempMessage != null) {
      PreemptionContract par1 = homePreempMessage.getContract();
      PreemptionContract par2 = otherPreempMessage.getContract();

      if (par1 == null && par2 != null) {
        homePreempMessage.setContract(par2);
      }

      if (par1 != null && par2 != null) {
        par1.getResourceRequest().addAll(par2.getResourceRequest());
        par1.getContainers().addAll(par2.getContainers());
      }

      StrictPreemptionContract spar1 = homePreempMessage.getStrictContract();
      StrictPreemptionContract spar2 = otherPreempMessage.getStrictContract();

      if (spar1 == null && spar2 != null) {
        homePreempMessage.setStrictContract(spar2);
      }

      if (spar1 != null && spar2 != null) {
        spar1.getContainers().addAll(spar2.getContainers());
      }
    }

    if (!isNullOrEmpty(otherResponse.getNMTokens())) {
      if (!isNullOrEmpty(homeResponse.getNMTokens())) {
        homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
      } else {
        homeResponse.setNMTokens(otherResponse.getNMTokens());
      }
    }

    if (!isNullOrEmpty(otherResponse.getUpdatedContainers())) {
      if (!isNullOrEmpty(homeResponse.getUpdatedContainers())) {
        homeResponse.getUpdatedContainers()
            .addAll(otherResponse.getUpdatedContainers());
      } else {
        homeResponse.setUpdatedContainers(otherResponse.getUpdatedContainers());
      }
    }

    if (!isNullOrEmpty(otherResponse.getUpdateErrors())) {
      if (!isNullOrEmpty(homeResponse.getUpdateErrors())) {
        homeResponse.getUpdateErrors().addAll(otherResponse.getUpdateErrors());
      } else {
        homeResponse.setUpdateErrors(otherResponse.getUpdateErrors());
      }
    }
  }

  /**
   * Add allocated containers to cache mapping.
   */
  private void cacheAllocatedContainers(List<Container> containers,
      SubClusterId subClusterId) {
    for (Container container : containers) {
      SubClusterId chooseSubClusterId = SubClusterId.newInstance(subClusterId.toString());
      LOG.debug("Adding container {}", container);

      if (this.containerIdToSubClusterIdMap.containsKey(container.getId())) {
        SubClusterId existingSubClusterId =
            this.containerIdToSubClusterIdMap.get(container.getId());
        if (existingSubClusterId.equals(subClusterId)) {
          /*
           * When RM fails over, the new RM master might send out the same
           * container allocation more than once.
           *
           * It is also possible because of a recent NM restart with NM recovery
           * enabled. We recover running containers from RM. But RM might not
           * notified AM of some of these containers yet. When RM dose notify,
           * we will already have these containers in the map.
           *
           * Either case, just warn and move on.
           */
          LOG.warn(
              "Duplicate containerID: {} found in the allocated containers"
                  + " from same sub-cluster: {}, so ignoring.",
              container.getId(), subClusterId);
        } else {

          LOG.info("Duplicate containerID found in the allocated containers. " +
              "try to re-pick the sub-cluster.");

          // The same container allocation from different sub-clusters,
          // something is wrong.
          try {

            boolean existAllocatedScHealth = isSCHealth(existingSubClusterId);
            boolean newAllocatedScHealth = isSCHealth(subClusterId);

            if (existAllocatedScHealth) {
              // If the previous RM which allocated Container is normal,
              // the previous RM will be used first
              LOG.info("Use Previous Allocated Container's subCluster. " +
                  "ContainerId: {} ApplicationId: {} From RM: {}.", this.attemptId,
                  container.getId(), existingSubClusterId);
              chooseSubClusterId = existingSubClusterId;
            } else if (newAllocatedScHealth) {
              // If the previous RM which allocated Container is abnormal,
              // but the RM of the newly allocated Container is normal, use the new RM
              LOG.info("Use Newly Allocated Container's subCluster. " +
                  "ApplicationId: {} ContainerId: {} From RM: {}.", this.attemptId,
                  container.getId(), subClusterId);
              chooseSubClusterId = subClusterId;
            } else {
              // There is a very small probability that an exception will be thrown.
              // The RM of the previously allocated Container
              // and the RM of the newly allocated Container are not normal.
              throw new YarnRuntimeException(
                  " Can't use any subCluster because an exception occurred" +
                  " ContainerId: " + container.getId() + " ApplicationId: " + this.attemptId +
                  " From RM: " + subClusterId + ". " +
                  " Previous Container was From subCluster: " + existingSubClusterId);
            }
          } catch (Exception ex) {
            // An exception occurred
            throw new YarnRuntimeException(
                " Can't use any subCluster because an exception occurred" +
                " ContainerId: " + container.getId() + " ApplicationId: " + this.attemptId +
                " From RM: " + subClusterId + ". " +
                " Previous Container was From subCluster: " + existingSubClusterId, ex);
          }
        }
      }

      this.containerIdToSubClusterIdMap.put(container.getId(), chooseSubClusterId);
    }
  }

  /**
   * Check to see if an AllocateRequest exists in the Map for the specified sub
   * cluster. If not found, create a new one, copy the value of responseId and
   * progress from the originalAMRequest, save it in the specified Map and
   * return the new instance. If found, just return the old instance.
   */
  private static AllocateRequest findOrCreateAllocateRequestForSubCluster(
      SubClusterId subClusterId, AllocateRequest originalAMRequest,
      Map<SubClusterId, AllocateRequest> requestMap) {
    AllocateRequest newRequest;
    if (requestMap.containsKey(subClusterId)) {
      newRequest = requestMap.get(subClusterId);
    } else {
      newRequest = createAllocateRequest();
      newRequest.setResponseId(originalAMRequest.getResponseId());
      newRequest.setProgress(originalAMRequest.getProgress());
      requestMap.put(subClusterId, newRequest);
    }
    return newRequest;
  }

  /**
   * Create an empty AllocateRequest instance.
   */
  private static AllocateRequest createAllocateRequest() {
    AllocateRequest request =
        RECORD_FACTORY.newRecordInstance(AllocateRequest.class);
    request.setAskList(new ArrayList<>());
    request.setReleaseList(new ArrayList<>());
    ResourceBlacklistRequest blackList =
        ResourceBlacklistRequest.newInstance(null, null);
    blackList.setBlacklistAdditions(new ArrayList<>());
    blackList.setBlacklistRemovals(new ArrayList<>());
    request.setResourceBlacklistRequest(blackList);
    request.setUpdateRequests(new ArrayList<>());
    return request;
  }

  protected Set<SubClusterId> getTimedOutSCs(boolean verbose) {
    Set<SubClusterId> timedOutSCs = new HashSet<>();
    for (Map.Entry<SubClusterId, Long> entry : this.lastSCResponseTime
        .entrySet()) {
      if (entry.getValue() > this.lastAMHeartbeatTime) {
        // AM haven't heartbeat to us (and thus we to all SCs) for a long time,
        // should not consider the SC as timed out
        continue;
      }
      long duration = this.clock.getTime() - entry.getValue();
      if (duration > this.subClusterTimeOut) {
        if (verbose) {
          LOG.warn("Subcluster {} doesn't have a successful heartbeat for {} seconds for {}",
              entry.getKey(), (double) duration / 1000, this.attemptId);
        }
        timedOutSCs.add(entry.getKey());
      }
    }
    return timedOutSCs;
  }

  /**
   * Check to see if the specified containerId exists in the cache and log an
   * error if not found.
   *
   * @param containerId the container id
   * @param actionName the name of the action
   * @return true if the container exists in the map, false otherwise
   */
  private boolean warnIfNotExists(ContainerId containerId, String actionName) {
    if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) {
      LOG.error(
          "AM is trying to {} a container {} that does not exist. Might happen "
              + "shortly after NM restart when NM recovery is enabled",
          actionName, containerId.toString());
      return false;
    }
    return true;
  }

  /**
   * Splits the specified request to send it to different sub clusters. The
   * splitting algorithm is very simple. If the request does not have a node
   * preference, the policy decides the sub cluster. If the request has a node
   * preference and if locality is required, then it is sent to the sub cluster
   * that contains the requested node. If node preference is specified and
   * locality is not required, then the policy decides the sub cluster.
   *
   * @param askList the ask list to split
   * @return the split asks
   * @throws YarnException if split fails
   */
  protected Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
      List<ResourceRequest> askList) throws YarnException {
    return policyInterpreter.splitResourceRequests(askList,
        getTimedOutSCs(true));
  }

  @VisibleForTesting
  protected int getUnmanagedAMPoolSize() {
    return this.uamPool.getAllUAMIds().size();
  }

  @VisibleForTesting
  protected UnmanagedAMPoolManager getUnmanagedAMPool() {
    return this.uamPool;
  }

  @VisibleForTesting
  protected Map<SubClusterId, Future<?>> getUamRegisterFutures() {
    return this.uamRegisterFutures;
  }

  @VisibleForTesting
  public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
    return this.asyncResponseSink;
  }

  /**
   * Async callback handler for heart beat response from all sub-clusters.
   */
  private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> {
    private final SubClusterId subClusterId;
    private final boolean isUAM;

    HeartbeatCallBack(SubClusterId subClusterId, boolean isUAM) {
      this.subClusterId = subClusterId;
      this.isUAM = isUAM;
    }

    @Override
    public void callback(AllocateResponse response) {
      org.apache.hadoop.yarn.api.records.Token amrmToken =
          response.getAMRMToken();
      synchronized (asyncResponseSink) {
        List<AllocateResponse> responses;
        if (asyncResponseSink.containsKey(subClusterId)) {
          responses = asyncResponseSink.get(subClusterId);
        } else {
          responses = new ArrayList<>();
          asyncResponseSink.put(subClusterId, responses);
        }
        responses.add(response);

        if (this.isUAM) {
          // Do not further propagate the new amrmToken for UAM
          response.setAMRMToken(null);
        }
        // Notify main thread about the response arrival
        asyncResponseSink.notifyAll();
      }
      lastSCResponse.put(subClusterId, response);
      lastSCResponseTime.put(subClusterId, clock.getTime());

      // Notify policy of allocate response
      try {
        policyInterpreter.notifyOfResponse(subClusterId, response);
      } catch (YarnException e) {
        LOG.warn("notifyOfResponse for policy failed for sub-cluster {}.", subClusterId, e);
      }

      // Save the new AMRMToken for the UAM if present
      // Do this last because it can be slow...
      if (this.isUAM && amrmToken != null) {
        Token<AMRMTokenIdentifier> newToken = ConverterUtils
            .convertFromYarn(amrmToken, (Text) null);
        // Do not further propagate the new amrmToken for UAM
        response.setAMRMToken(null);

        // Update the token in registry or NMSS
        if (registryClient != null) {
          if (registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
              subClusterId.getId(), newToken)) {
            try {
              AMRMTokenIdentifier identifier = new AMRMTokenIdentifier();
              identifier.readFields(new DataInputStream(
                  new ByteArrayInputStream(newToken.getIdentifier())));
              LOG.info("Received new UAM amrmToken with keyId {} and service {} from {} for {}, " +
                  "written to Registry", identifier.getKeyId(), newToken.getService(),
                  subClusterId, attemptId);
            } catch (IOException e) {
            }
          }
        } else if (getNMStateStore() != null) {
          try {
            getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
                NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(),
                newToken.encodeToUrlString().getBytes(StandardCharsets.UTF_8));
          } catch (IOException e) {
            LOG.error("Error storing UAM token as AMRMProxy "
                + "context entry in NMSS for {}.", attemptId, e);
          }
        }
      }
    }
  }

  /**
   * Private structure for encapsulating SubClusterId and
   * FinishApplicationMasterResponse instances.
   */
  private static class FinishApplicationMasterResponseInfo {
    private final FinishApplicationMasterResponse response;
    private final String subClusterId;

    FinishApplicationMasterResponseInfo(
        FinishApplicationMasterResponse response, String subClusterId) {
      this.response = response;
      this.subClusterId = subClusterId;
    }

    public FinishApplicationMasterResponse getResponse() {
      return response;
    }

    public String getSubClusterId() {
      return subClusterId;
    }
  }

  /**
   * Utility method to check if the specified Collection is null or empty.
   *
   * @param c the collection object
   * @param <T> element type of the collection
   * @return whether is it is null or empty
   */
  public static <T> boolean isNullOrEmpty(Collection<T> c) {
    return (c == null || c.size() == 0);
  }

  /**
   * Utility method to check if the specified Collection is null or empty.
   *
   * @param c the map object
   * @param <T1> key type of the map
   * @param <T2> value type of the map
   * @return whether is it is null or empty
   */
  public static <T1, T2> boolean isNullOrEmpty(Map<T1, T2> c) {
    return (c == null || c.size() == 0);
  }

  @VisibleForTesting
  protected void cacheAllocatedContainersForSubClusterId(
      List<Container> containers, SubClusterId subClusterId) {
    cacheAllocatedContainers(containers, subClusterId);
  }

  @VisibleForTesting
  protected Map<ContainerId, SubClusterId> getContainerIdToSubClusterIdMap() {
    return containerIdToSubClusterIdMap;
  }

  private boolean isSCHealth(SubClusterId subClusterId) throws YarnException {
    Set<SubClusterId> timeOutScs = getTimedOutSCs(true);
    SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
    if (timeOutScs.contains(subClusterId) ||
        subClusterInfo == null || !subClusterInfo.getState().isUsable()) {
      return false;
    }
    return true;
  }
}
